技术经验 • dingxiao • 阅读数:2011 • 2018年7月16日 15:36
参考文档地址:[VideoAndMQTT](https://github.com/titos-carrasco/VideoAndMQTT)
该项目采用python实现,其中参考了html实现的方法,代码为:
<!DOCTYPE html PUBLIC '-//W3C//DTD XHTML 1.0 Strict//EN' 'http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd'>
<html xmlns='http://www.w3.org/1999/xhtml' xml:lang='es'>
<head>
<title>Video and WS/MQTT (paho lib)</title>
<meta http-equiv='content-type' content='text/html;charset=utf-8' />
<script src='https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js' type='text/javascript'></script>
<script type='text/javascript'>
// put here the mqtt connections parameters
var streams = [
[ "xx.xx.xx.xx", xxxx, "clientId", "pic", "rcr_" + createUUID() ],
// [ "broker.hivemq.com", 8000, "/mqtt", "demos/rcr/video2", "rcr_" + createUUID() ],
// [ "broker.hivemq.com", 8000, "/mqtt", "demos/rcr/video3", "rcr_" + createUUID() ],
];
function createUUID() {
var s = [];
var hexDigits = "0123456789abcdef";
for (var i = 0; i < 36; i++) {
s[i] = hexDigits.substr(Math.floor(Math.random() * 0x10), 1);
}
s[14] = "4"; // bits 12-15 of the time_hi_and_version field to 0010
s[19] = hexDigits.substr((s[19] & 0x3) | 0x8, 1); // bits 6-7 of the clock_seq_hi_and_reserved to 01
s[8] = s[13] = s[18] = s[23] = "-";
var uuid = s.join("");
return uuid;
}
function Connect()
{
for( i=0; i<streams.length; i++ ){
img = document.createElement( 'img' );
img.width = "320";
img.height = "240";
img.src='';
img.id = 'image_' + i;
container = document.getElementById( 'container' );
container.appendChild( img );
stream = streams[i]
// client = new Paho.MQTT.Client( stream[0],stream[1], stream[2], stream[4] );
client = new Paho.MQTT.Client( stream[0],stream[1], stream[2]);
client.idx = i
client.onMessageArrived = function ( message ){
data = message.payloadbytes;
img = document.getelementbyid( 'image_' + this.idx )
img.src = 'data:image/png;base64,' + btoa( string.fromcharcode.apply( null, data ) );
}.bind( client );
function onConnect(){
stream = streams[ this.idx ]
this.subscribe( stream[3] );
}
client.connect( { onSuccess: onConnect.bind( client ) } );
}
}
</script>
</head>
<body onload='Connect();'>
<h1>Video and WS/MQTT (paho lib)</h1>
<div id='container'></div>
</body>
</html>
其中关键代码为:
client.onMessageArrived = function ( message ){
data = message.payloadbytes;
img = document.getelementbyid( 'image_' + this.idx )
img.src = 'data:image/png;base64,' + btoa( string.fromcharcode.apply( null, data ) );
}.bind( client );
代码实现了html端接收MQTT发送的图片数据,并解码的过程。
发送端使用Java实现 ,发送端MQTT初始化代码为:
private void Mqtt_Init()
{
MemoryPersistence persistence = new MemoryPersistence();
try {
sampleClient = new MqttAsyncClient(broker, "dx", persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
sampleClient.setCallback(null);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
Thread.sleep(1000);
sampleClient.subscribe("dx", 0);
System.out.println("Subscribed");
} catch (Exception me) {
if (me instanceof MqttException) {
System.out.println("reason " + ((MqttException) me).getReasonCode());
}
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
上述配置中,连接节点必须配置callback。
图片转码参考网址:Convert Image to Base64 String or Base64 String to Image in Java.参考代码:
package com;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import org.apache.commons.codec.binary.Base64;
public class ImageToBase64 {
public static void main(String...s) throws Exception{
//encode image to Base64 String
File f = new File("H:/encode/sourceImage.png"); //change path of image according to you
FileInputStream fis = new FileInputStream(f);
byte byteArray[] = new byte[(int)f.length()];
fis.read(byteArray);
String imageString = Base64.encodeBase64String(byteArray);
//decode Base64 String to image
FileOutputStream fos = new FileOutputStream("H:/decode/destinationImage.png"); //change path of image according to you
byteArray = Base64.decodeBase64(imageString);
fos.write(byteArray);
fis.close();
fos.close();
}
}
发送端对采集到的图片数据进行转码及MQTT发布操作代码为:
private void Mqtt_Send_Pic()
{
MainController_opencv.Mqtt_Lock = true;
try {
try {
ImageIO.write(ImageIO.read(new File("/home/pi/mqtt.png")), "JPEG", new File("/home/pi/mqtt.jpg"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//encode image to Base64 String
File f = new File("/home/pi/mqtt.jpg"); //change path of image according to you
FileInputStream fis = new FileInputStream(f);
byte byteArray[] = new byte[(int)f.length()];
fis.read(byteArray);
//String imageString = Base64.encodeBase64String(byteArray);
sampleClient.publish("pic", byteArray, 0, false);
fis.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
MainController_opencv.Mqtt_Lock = false;
}
图片数据由于是采用opencv方式采集,原始数据最先处理成png图片格式,所以需要转码成jpg图片格式才能满足发布要求,故需先转码。发布时图片数据被处理成字节流方式进行发布。
在使用MQTT进行图片数据发布时,由于网络环境及小型单板计算机能力的局限性,造成MQTT发布时遇到断连问题,在实际测试中也经常出现,断连后,MQTT本身自属的机制可以自动检测到,并可以实施断连后的自动连接。
在MQTT发布接口处增加异常捕捉:
sampleClient.publish("pic", byteArray, 0, false);
捕捉代码为:
catch (Exception e)
{
// TODO Auto-generated catch block
//e.printStackTrace();
System.out.println("send error");
MQTT_ConectLost_Flag = false;
//@-mqtt接口初始化
Mqtt_Init();
}